-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Perf: Optimize in memory sort #15380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
let mut current_batches = Vec::new(); | ||
let mut current_size = 0; | ||
|
||
for batch in std::mem::take(&mut self.in_mem_batches) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be nice to use pop
(while let Some(batch) = v.pop
) here to remove the batch from the vec once sorted to reduce memory usage. Now the batch is AFAIK retained until after the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be nice to use
pop
(while let Some(batch) = v.pop
) here to remove the batch from the vec once sorted to reduce memory usage. Now the batch is AFAIK retained until after the loop.
Thank you @Dandandan for review and good suggestion, addressed your suggestion!
I think this is already looking quite nice. What do you need to finalize this @zhuqi-lucas |
Thank you @Dandandan for review, i think we just need to add the benchmark result for this PR for next step. And it's mergable for the first version, later we can improve it according to comments: |
@alamb Do we have the CI benchmark running now? If no, i need your help to run... Thanks a lot! And also for the sort-tpch itself, i was running for the improvement result, but not for other benchmark running. Previous sort-tpch: ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ concat_batches_for_sort ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 2241.04ms │ 1816.69ms │ +1.23x faster │
│ Q2 │ 1841.01ms │ 1496.73ms │ +1.23x faster │
│ Q3 │ 12755.85ms │ 12770.18ms │ no change │
│ Q4 │ 4433.49ms │ 3278.70ms │ +1.35x faster │
│ Q5 │ 4414.15ms │ 4409.04ms │ no change │
│ Q6 │ 4543.09ms │ 4597.32ms │ no change │
│ Q7 │ 8012.85ms │ 9026.30ms │ 1.13x slower │
│ Q8 │ 6572.37ms │ 6049.51ms │ +1.09x faster │
│ Q9 │ 6734.63ms │ 6345.69ms │ +1.06x faster │
│ Q10 │ 9896.16ms │ 9564.17ms │ no change │
└──────────────┴────────────┴─────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main) │ 61444.64ms │
│ Total Time (concat_batches_for_sort) │ 59354.33ms │
│ Average Time (main) │ 6144.46ms │
│ Average Time (concat_batches_for_sort) │ 5935.43ms │
│ Queries Faster │ 5 │
│ Queries Slower │ 1 │
│ Queries with No Change │ 4 │
└────────────────────────────────────────┴────────────┘ |
Latest result based current latest code: --------------------
Benchmark sort_tpch1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ concat_batches_for_sort ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 153.49ms │ 137.57ms │ +1.12x faster │
│ Q2 │ 131.29ms │ 120.93ms │ +1.09x faster │
│ Q3 │ 980.57ms │ 982.22ms │ no change │
│ Q4 │ 252.25ms │ 245.09ms │ no change │
│ Q5 │ 464.81ms │ 449.27ms │ no change │
│ Q6 │ 481.44ms │ 455.45ms │ +1.06x faster │
│ Q7 │ 810.73ms │ 709.74ms │ +1.14x faster │
│ Q8 │ 498.10ms │ 491.12ms │ no change │
│ Q9 │ 503.80ms │ 510.20ms │ no change │
│ Q10 │ 789.02ms │ 706.45ms │ +1.12x faster │
│ Q11 │ 417.39ms │ 411.50ms │ no change │
└──────────────┴──────────┴─────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main) │ 5482.89ms │
│ Total Time (concat_batches_for_sort) │ 5219.53ms │
│ Average Time (main) │ 498.44ms │
│ Average Time (concat_batches_for_sort) │ 474.50ms │
│ Queries Faster │ 5 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 6 │
└────────────────────────────────────────┴───────────┘
--------------------
Benchmark sort_tpch10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ concat_batches_for_sort ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 2243.52ms │ 1825.64ms │ +1.23x faster │
│ Q2 │ 1842.11ms │ 1639.00ms │ +1.12x faster │
│ Q3 │ 12446.31ms │ 11981.63ms │ no change │
│ Q4 │ 4047.55ms │ 3715.96ms │ +1.09x faster │
│ Q5 │ 4364.46ms │ 4503.51ms │ no change │
│ Q6 │ 4561.01ms │ 4688.31ms │ no change │
│ Q7 │ 8158.01ms │ 7915.54ms │ no change │
│ Q8 │ 6077.40ms │ 5524.08ms │ +1.10x faster │
│ Q9 │ 6347.21ms │ 5853.44ms │ +1.08x faster │
│ Q10 │ 11561.03ms │ 14235.69ms │ 1.23x slower │
│ Q11 │ 6069.42ms │ 5666.77ms │ +1.07x faster │
└──────────────┴────────────┴─────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main) │ 67718.04ms │
│ Total Time (concat_batches_for_sort) │ 67549.58ms │
│ Average Time (main) │ 6156.19ms │
│ Average Time (concat_batches_for_sort) │ 6140.87ms │
│ Queries Faster │ 6 │
│ Queries Slower │ 1 │
│ Queries with No Change │ 4 │
└────────────────────────────────────────┴────────────┘ |
Thanks for sharing the results @zhuqi-lucas this is really interesting! I think it mainly shows that we probably should try and use more efficient in memory sorting (e.g. an arrow kernel that sorts multiple batches) here rather than use |
🤖 |
I think the SortPreservingMergeStream is about as efficient as we know how to make it Maybe we can look into what overhead makes concat'ing better 🤔 Any per-stream overhead we can improve in SortPreservingMergeStream would likely flow directly to any query that does sorts |
This comment was marked as outdated.
This comment was marked as outdated.
Hm that doesn't make much sense as
Hm 🤔 ... but that will still take a separate step of sorting the input bathes, which next to sorting involves a full extra copy using I think the most efficient way would be to sort the indices to the arrays in one step followed by |
It seems when we merge the sorted batch, we already using the interleave to merge the sorted indices, here is the code: /// Drains the in_progress row indexes, and builds a new RecordBatch from them
///
/// Will then drop any batches for which all rows have been yielded to the output
///
/// Returns `None` if no pending rows
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
if self.is_empty() {
return Ok(None);
}
let columns = (0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self
.batches
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
Ok(interleave(&arrays, &self.indices)?)
})
.collect::<Result<Vec<_>>>()?;
self.indices.clear(); But this PR, we also concat some batches into one batch, do you mean we can also use the indices from each batch to one batch just like the merge phase? |
Thanks @alamb for this triggering, it seems stuck. |
I mean theoretically we don't have to The merging is useful for sorting streams of data, but I think it is expected the process of sorting batches first followed by a custom merge implementation is slower than a single sorting pass based on rust std unstable sort (which is optimized for doing a minimal amount of comparisons quickly). |
A more complete rationale / explanation of the same idea was written here by @2010YOUY01 #15375 (comment)
|
I think i got it now, thank you @Dandandan, it means we already have those in memory batch, we just need to first sort all elements' indices (2-level index consists of (batch_idx, row_idx)), we don't need to construct the StreamingMergeBuilder for in memory sort, we just need to sort it as a single sorting pass. Let me try this way, and compare the performance! |
Very interesting, firstly i now try merge all memory batch, and single sort, some query become crazy fast and some crazy slow, i think because:
So next step, we can try to make the in memory sort with parallel? --------------------
Benchmark sort_tpch10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ concat_batches_for_sort ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 2243.52ms │ 1416.52ms │ +1.58x faster │
│ Q2 │ 1842.11ms │ 1096.12ms │ +1.68x faster │
│ Q3 │ 12446.31ms │ 12535.45ms │ no change │
│ Q4 │ 4047.55ms │ 1964.73ms │ +2.06x faster │
│ Q5 │ 4364.46ms │ 5955.70ms │ 1.36x slower │
│ Q6 │ 4561.01ms │ 6275.39ms │ 1.38x slower │
│ Q7 │ 8158.01ms │ 19145.68ms │ 2.35x slower │
│ Q8 │ 6077.40ms │ 5146.80ms │ +1.18x faster │
│ Q9 │ 6347.21ms │ 5544.48ms │ +1.14x faster │
│ Q10 │ 11561.03ms │ 23572.68ms │ 2.04x slower │
│ Q11 │ 6069.42ms │ 4810.88ms │ +1.26x faster │
└──────────────┴────────────┴─────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main) │ 67718.04ms │
│ Total Time (concat_batches_for_sort) │ 87464.44ms │
│ Average Time (main) │ 6156.19ms │
│ Average Time (concat_batches_for_sort) │ 7951.31ms │
│ Queries Faster │ 6 │
│ Queries Slower │ 4 │
│ Queries with No Change │ 1 │
└────────────────────────────────────────┴────────────┘ Patch tried: diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs
index 7fd1c2b16..ec3cd89f3 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -671,85 +671,14 @@ impl ExternalSorter {
return self.sort_batch_stream(batch, metrics, reservation);
}
- // If less than sort_in_place_threshold_bytes, concatenate and sort in place
- if self.reservation.size() < self.sort_in_place_threshold_bytes {
- // Concatenate memory batches together and sort
- let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
- self.in_mem_batches.clear();
- self.reservation
- .try_resize(get_reserved_byte_for_record_batch(&batch))?;
- let reservation = self.reservation.take();
- return self.sort_batch_stream(batch, metrics, reservation);
- }
-
- let mut merged_batches = Vec::new();
- let mut current_batches = Vec::new();
- let mut current_size = 0;
-
- // Drain in_mem_batches using pop() to release memory earlier.
- // This avoids holding onto the entire vector during iteration.
- // Note:
- // Now we use `sort_in_place_threshold_bytes` to determine, in future we can make it more dynamic.
- while let Some(batch) = self.in_mem_batches.pop() {
- let batch_size = get_reserved_byte_for_record_batch(&batch);
-
- // If adding this batch would exceed the memory threshold, merge current_batches.
- if current_size + batch_size > self.sort_in_place_threshold_bytes
- && !current_batches.is_empty()
- {
- // Merge accumulated batches into one.
- let merged = concat_batches(&self.schema, ¤t_batches)?;
- current_batches.clear();
-
- // Update memory reservation.
- self.reservation.try_shrink(current_size)?;
- let merged_size = get_reserved_byte_for_record_batch(&merged);
- self.reservation.try_grow(merged_size)?;
-
- merged_batches.push(merged);
- current_size = 0;
- }
-
- current_batches.push(batch);
- current_size += batch_size;
- }
-
- // Merge any remaining batches after the loop.
- if !current_batches.is_empty() {
- let merged = concat_batches(&self.schema, ¤t_batches)?;
- self.reservation.try_shrink(current_size)?;
- let merged_size = get_reserved_byte_for_record_batch(&merged);
- self.reservation.try_grow(merged_size)?;
- merged_batches.push(merged);
- }
-
- // Create sorted streams directly without using spawn_buffered.
- // This allows for sorting to happen inline and enables earlier batch drop.
- let streams = merged_batches
- .into_iter()
- .map(|batch| {
- let metrics = self.metrics.baseline.intermediate();
- let reservation = self
- .reservation
- .split(get_reserved_byte_for_record_batch(&batch));
-
- // Sort the batch inline.
- let input = self.sort_batch_stream(batch, metrics, reservation)?;
- Ok(input)
- })
- .collect::<Result<_>>()?;
-
- let expressions: LexOrdering = self.expr.iter().cloned().collect();
-
- StreamingMergeBuilder::new()
- .with_streams(streams)
- .with_schema(Arc::clone(&self.schema))
- .with_expressions(expressions.as_ref())
- .with_metrics(metrics)
- .with_batch_size(self.batch_size)
- .with_fetch(None)
- .with_reservation(self.merge_reservation.new_empty())
- .build()
+ // Because batches are all in memory, we can sort them in place
+ // Concatenate memory batches together and sort
+ let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+ self.in_mem_batches.clear();
+ self.reservation
+ .try_resize(get_reserved_byte_for_record_batch(&batch))?;
+ let reservation = self.reservation.take();
+ self.sort_batch_stream(batch, metrics, reservation)
} |
I think
I think for The core improvements that I think are important:
|
Good explain.
I see, the execute already using partition: fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> { |
In this case, the final merging might become the bottleneck, because SPM does not have internal parallelism either, during the final merge only 1 core is busy. |
Yes, to be clear I don't argue to remove SortPreservingMergeExec or sorting in two fases altogether or something similar, just was reacting to the idea of adding more parallelism in
|
Thank you @2010YOUY01 @Dandandan , it's very interesting, i am thinking:
final_merged_batch_size =
if (partition_cal_size < min_sort_size) => min_sort_size
else if (partition_cal_size > max_sort_size) => max_sort_size
else => partition_cal_size This prevents creating too many small batches (which can fragment merge tasks) or overly large batches. But how can we calculate the min_sort_size and max_sort_size?
|
🤖 |
yeah, sorry I had a bug retriggered |
I wonder if we can skip interleave / copying entirely? Specifically, what if we sorted to indices, as you suggested, but then instead of calling |
🤖: Benchmark completed Details
|
Thanks @alamb , it looks promising. |
No performance improvement for benchmark, i believe mostly the benchmark batch size > sort_in_place size, it will not gain from this PR. Sort-tpch 10 should gain performance not in this benchmark list. |
It may be that the tests are unstable or that the memory / code made for x86 was different than for aarch. I'll run the benchmarks again to see if they are reproducable |
🤖 |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
I was wondering, maybe we only add the "concat-arrays-only-instead-of-full-record-batch" optimization and leave the rest for what it is (for now)? So don't change the It might be of smaler significance, but it is more likely to have no or smaller regressions (and we can follow-up with better heuristics that work accros different machines). |
Thank you @alamb , the result still regression for linux run.
Good point @Dandandan , let me do the smallest optimization first, i will address it. |
6a3b4e7
to
223b9d9
Compare
Thank you @Dandandan @alamb , |
🤖 |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
// Also, we only support sort expressions with less than 3 columns for now. Because from testing, when | ||
// columns > 3, the performance of in-place sort is worse than sort/merge. | ||
// Need to further investigate the performance of in-place sort when columns > 3. | ||
if self.expr.len() <= 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we remove this self.expr.len() <= 2
here? This wasn't here before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Dandandan for review, there was regression for testing >2 cases.
Addressed in latest PR, let's see the result for smallest changes. Thanks.
Thank you @alamb, sorry that we need to trigger the benchmark again for latest changes to see the smallest changes result. |
🤖 |
Looking at the earlier result
This is the query
So it might actually be the case that the changed code is a bit slower for this case. In the query there is only little data to copy (so concat batches -> concat sort keys doesn't help that much) while maybe the overhead of using interleave_batches is higher 🤔 |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
Which issue does this PR close?
Rationale for this change
Perf: Support automatically concat_batches for sort which will improve performance
And it's mergable for the first version, later we can improve it according to comments:
#15375 (comment)
What changes are included in this PR?
Perf: Support automatically concat_batches for sort which will improve performance
Are these changes tested?
Yes
Are there any user-facing changes?
No